package net.quanter.shield.mq.rocketmq.producer;


import net.quanter.shield.common.dto.result.ResultDTO;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.MQProducer;
import net.quanter.shield.mq.rocketmq.enums.RocketMqType;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.param.RocketMQTopicParam;

import java.util.Map;

public class DefaultMQPushProducerWrapper {
    final static ResultDTO cannotSendMQResultDTO = ResultDTO.failure("message can't to be send", 7002);
    final static ResultDTO messageCantNotBeNull = ResultDTO.failure("message can't be null", 7003);
    final MQProducer rocketMQProducer;
    final MQMessageSendHandler mqMessageSendHandler;
    final RocketMQTopicParam topic;

    public DefaultMQPushProducerWrapper(
            RocketMqType type,
            String producerGroup,
            RocketMQBorkerParam mqConnectVO,
            RocketMQTopicParam topic,
            MQMessageSendHandler mqMessageSendHandler
    ) throws Exception {
        this.mqMessageSendHandler = mqMessageSendHandler;
        this.topic = topic;
        if (type == RocketMqType.HTTP) {
            rocketMQProducer = new RocketMQHttpProducer(mqConnectVO, topic);
        } else if (type == RocketMqType.TCP) {
            rocketMQProducer = new RocketMQTcpProducer(mqConnectVO, topic);
        } else {
            rocketMQProducer = new RocketMQCommunityProducer(mqConnectVO, topic, producerGroup);
        }
    }


    public <T> ResultDTO<MQMessageVO<T>> sendMessage(
            String tag,
            String shardKey,
            T obj,
            Map<String, String> properties,
            String messageId) {
        MQMessageVO<T> mqMessageVO = rocketMQProducer.getMQMessage(obj, tag, shardKey, topic.getName(), properties, messageId);
        boolean canContinue = true;
        //消息发送前处理
        if (mqMessageSendHandler != null) {
            canContinue = mqMessageSendHandler.beforeSend(mqMessageVO);
        }
        if (canContinue) {
            Object soureMessage = rocketMQProducer.getSourceMessageFromMQMessage(mqMessageVO);
            ResultDTO sendResult = rocketMQProducer.send(mqMessageVO, soureMessage);
            //消息发送后处理
            if (mqMessageSendHandler != null) {
                mqMessageSendHandler.afterSend(sendResult, mqMessageVO);
            }
            if (sendResult.isSuccess()) {
                return ResultDTO.success(mqMessageVO);
            } else {
                return sendResult;
            }
        } else {
            return cannotSendMQResultDTO;
        }
    }

    public <T> ResultDTO<MQMessageVO<T>> sendMessage(
            String tag,
            T obj) {
        if (obj == null || tag == null) {
            return messageCantNotBeNull;
        }
        return sendMessage(tag, obj.toString(), obj, null, null);
    }

    public <T> ResultDTO<MQMessageVO<T>> sendMessage(
            String tag,
            String shardKey,
            T obj) {
        if (obj == null || tag == null) {
            return messageCantNotBeNull;
        }
        return sendMessage(tag, shardKey, obj, null, null);
    }

    public <T> ResultDTO<MQMessageVO<T>> sendMessage(
            String tag,
            String shardKey,
            T obj,
            Map<String, String> properties) {
        return sendMessage(tag, shardKey, obj, properties, null);
    }

    public void close() {
        rocketMQProducer.close();
    }

}
